[Apache Kafka] Kafka StreamsでStream処理をしてみる [node.js]
Apache Kafkaとは
Apache Kafkaとは、Linkedinが開発した分散メッセージキューで、
データストリーミング用のプラットフォームです。
メッセージキューとは、システム間でデータのハブととして機能し、
対象データを一時的に保持してくコンポーネントです。
キューをはさむことでシステム間を疎結合にし、通信を非同期化することができます。
AWSでいえば、sqsやkinesisが同様のサービスにあたります。
Apache Kafkaはスケーラビリティや大量データの扱いに長けており、耐障害性もあるスグレモノです。
本稿ではApache Kakfaとnode.js用モジュールのkafka-nodeとkafka-streamsをつかって
node.jsからkafkaへアクセスしてみます。
Amazon Kinesisとの比較
以前はAWSフルマネージド(kinesis)かどうか、という決定的な違いがあったのですが、
2018年11月にAmazon Managed Streaming for Apache Kafka (Amazon MSK)が
リリースされたのでその違いもほぼなくなりました。
※ Amazon MSK : Apache Kafka をストリーミングデータ処理に使用する際のフルマネージド型サービス
ここではkafkaとkinesisの比較をしていますが、
主な違いは
- データ保持期間 : kafka = 無制限 , kinesis = 1日〜7日
- データサイズmax : kafka = default 1MB(設定可能) , kinesis = 1MB
- 依存サービス : kafka = ZooKeeper , kinesis = DynamoDB
というような感じです。
絶対にどちらかしか実現できない機能はありませんが、
最初から大量のストリームデータ配信を想定している場合はkafka、
そうでない場合はkinesisを選択することが多いようです。
参考:
・https://www.ossnews.jp/compare/Apache_Kafka/Amazon_Kinesis
・http://www.itcheerup.net/2019/01/kafka-vs-kinesis/
Apache Kafkaのサンプル
まずはKafkaを動かしてみましょう。
今回はdockerをつかってkafkaを起動し、node.jsでproducer/consumerサンプルの確認をしてみます。
環境
今回使用した動作環境は以下のとおりです。
- OS : MacOS X 10.12.6
- Docker : 18.09.2
- node.js : v11.14.0
Dockerイメージからkafka用コンテナを作成してコンテナにログインします。
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092 --name test_kafka spotify/kafka $ docker exec -it test_kafka bash
kafkaディレクトリに移動し、topicをtestという名前で作成します。
root@xxxx:/# cd /opt/kafka_2.11-0.10.1.0/ root@xxxx:/# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
topicを作成したらnode.jsでProducer側のプログラム(producer.js)を作成します。
本稿では、node.jsからkafkaにアクセスするためのkafka-nodeモジュールを使用します。
事前にkafka-nodeをnpmでインストールしておきましょう。
% cd path/yuor/node-app % npm install kafka-node
// publisher.js 'use strict'; var kafka = require("kafka-node"); const Producer = kafka.HighLevelProducer; const client = new kafka.KafkaClient({ kafkaHost: "localhost:9092" }); const producer = new Producer(client, { partitionerType: 1 }); producer.on("ready", () => { //プログラム引数で名前と年齢を受け取る const name = process.argv[2]; const age = process.argv[3]; const message = [ { topic: "test", messages: JSON.stringify({name: name, age: age}) } ]; producer.send(message, (err, data) => { if (err) console.log(err); else console.log('send messages'); process.exit(); }); });
次にConsumer(consumer.js)を作成します。
'use strict'; var kafka = require('kafka-node'); const Consumer = kafka.Consumer; const client = new kafka.KafkaClient({kafkaHost: "localhost:9092"}); const consumer = new Consumer( client, [{topic: "test", partision:0}], { groupId: "my-consumer", autoCommit: true, fromOffset: true } ); consumer.on("message", (message, err) => { if (err) console.log("error : " + err); const json = JSON.parse(message.value); console.log("JSON:" + JSON.stringify(json)); console.log("Name:" + json.name); console.log("Age:" + json.age); }); consumer.on('error', function (err) { console.log('error', err); });
producer.jsを実行してkafkaにメッセージを送ります。
$ node producer.js taro 30 send messages
consumerを起動すると、topicに対してメッセージを取得しに行きます。
(先にconsumer.jsを起動していてもメッセージが送られたときに取得する)
$ node consumer.js JSON:{"name":"taro","age":"30"} Name:taro Age:30
これでkafkaの動作確認は終了です。
Kafka Streamsとは
Kafkaの動作確認もできたので、次はKafka Streamsを動かしてみましょう。
Kafka Streamsとは、Apache Kafka v0.10から同梱されているライブラリで、
これを使えばStream処理をある程度簡単に実装できるようになります。
例えば、
「サンプルAのtopicにデータが送られたら、それに対して処理を実行してサンプルBのtopicへ送る」
といった処理が可能になります。
KStreamとKTable
Kafkaに流れてくるStream(key-value形式)には2つの種類があるという考えのもと、
「KStream」「KTable」という2つのStreamタイプを使い分けることができます。
1つ目はKStream(record stream)と呼ばれる、
Kafkaからうけとったデータがそのまま追加されるタイプです。
ここにある例でいうと、↓にあるような2件のStreamデータが流れてきたとします。
// { user(key) : count } {"alice" : 1} {"alice" : 3}
このデータはユーザーごとのカウント数を表しており、KStreamを使用しているならcountは4となります。
2つ目はKTable(changelog stream)で、
もしすでに同じキーのデータが存在するならデータが更新されます。
上記2件のStreamデータをKTableで処理した場合、countは3となります。
なお、KTableにおいてNULLを持つデータは、そのデータのキーに対する削除を表します。
その他Kafakaの特徴
他にもKafkaではいろいろな機能を持っています。
Streamとテーブルのjoinができたり、集約(max/min/avg/sum) も可能です。
また、window関数を使用して任意の期間についてStreamデータをグルーピングすることも可能です。
join/aggregation/windowingについては公式ドキュメント等をご確認ください。
node-kafka-streamsを使ったサンプル
では、kafka-streamsを使った簡単なサンプルを作成してみましょう。
ここにあるwordCountを少しかえて試してみます。
ここではinput-topicとoutput-topicの2つのtopicを使用するので、
コンテナにログインして作成しましょう。
root@xxxx:/# cd /opt/kafka_2.11-0.10.1.0/ root@xxxx:/# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic input-topic root@xxxx:/# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic output-topic
次に、Kafka Streamsを使用するためのモジュール、kafka-streamsもnpmでインストールします。
% cd path/yuor/node-app % npm install kafka-streams
Kafka Streamsを使ったサンプルです。
ここではinput-topicにメッセージを送り、そのメッセージの数をcountしてoutput-topicに送ります。
例えば、
"ftuit banana"
"ftuit orange"
"ftuit apple"
と3つのメッセージをinput-topicにおくった場合、
"fluit 3"
というふうに、メッセージの最初の文字列(ここだとfluit)をキーとして
その数をoutpu-topicに送ります。
"use strict"; //wordCount.js const { KafkaStreams } = require("kafka-streams"); const { nativeConfig: config } = require("./config.js"); const keyMapperEtl = (kafkaMessage) => { const value = kafkaMessage.value.toString("utf8"); console.log("message : " + kafkaMessage); const elements = value.toLowerCase().split(" "); return { someField: elements[0], }; }; const kafkaStreams = new KafkaStreams(config); kafkaStreams.on("error", (error) => { console.log("Error occured:", error.message); }); const stream = kafkaStreams.getKStream(); //input-topicから取得したデータを //キー毎にカウントしてoutput-topicに送る(count >= 3のキー) stream .from("input-topic") .map(keyMapperEtl) .countByKey("someField", "count") .filter(kv => kv.count >= 3) .map(kv => kv.someField + " " + kv.count) .tap(kv => console.log(kv)) .to("output-topic"); Promise.all([ stream.start() ]).then(() => { console.log("started.."); // 50秒したらStreamをclose setTimeout(() => { kafkaStreams.closeAll(); console.log("stopped.."); }, 50000); });
Streamの設定ファイルです。
"use strict"; //config.js //dont use these settings for production, it will set your broker on fire.. const batchOptions = { batchSize: 5, commitEveryNBatch: 1, concurrency: 1, commitSync: false, noBatchCommits: false }; const nativeConfig = { noptions: { "metadata.broker.list": "localhost:9092", //native client requires broker hosts to connect to "group.id": "kafka-streams-test-native", "client.id": "kafka-streams-test-name-native", "event_cb": true, "compression.codec": "snappy", "api.version.request": true, "socket.keepalive.enable": true, "socket.blocking.max.ms": 100, "enable.auto.commit": false, "auto.commit.interval.ms": 100, "heartbeat.interval.ms": 250, "retry.backoff.ms": 250, "fetch.min.bytes": 100, "fetch.message.max.bytes": 2 * 1024 * 1024, "queued.min.messages": 100, "fetch.error.backoff.ms": 100, "queued.max.messages.kbytes": 50, "fetch.wait.max.ms": 1000, "queue.buffering.max.ms": 1000, "batch.num.messages": 10000 }, tconf: { "auto.offset.reset": "earliest", "request.required.acks": 1 }, batchOptions }; module.exports = { nativeConfig };
wordCount.jsを起動し、input-topicに対して下記のようなメッセージをおくってみると、
"fruit banana"
"fruit orange"
"fruit apple"
のような同一キーを指定したメッセージを3つ送ると、
output-topicに対して
"fruit 3"
といったキーのカウント数を送ります。
最後に
今回はApache KafkaとKafka Streamsの動きを簡単に確認してみました。
ストリーム処理が非常に簡単に動かすことができたと思います。
他にも使えそうな機能が多くあるので、確認してみてください。
なお、動作確認がおわったら、不必要なコンテナはdocker rmで削除しておきましょう。
# コンテナを削除 $ docker rm test_kafka
参考サイト
- https://qiita.com/41semicolon/items/60f92a1db6dfce4303c5
- https://qiita.com/sigmalist/items/5a26ab519cbdf1e07af3
- https://qiita.com/41semicolon/items/60f92a1db6dfce4303c5
- https://qiita.com/mkyz08/items/a3b866c46ca49c52e647
- http://pppurple.hatenablog.com/entry/2019/03/28/235810
- https://www.slideshare.net/techblogyahoo/kafka-streams-kafkajp
- https://github.com/SOHU-Co/kafka-node
- https://github.com/tulios/kafkajs
- https://qiita.com/minarai/items/f571db36a19806aee491
- https://github.com/nodefluent/kafka-streams
- https://vicki.substack.com/p/you-dont-need-kafka